%%--------------------------------------------------------------------
%% Copyright (c) 2018-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%%     http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-module(emqx_broker_SUITE).

-compile(export_all).
-compile(nowarn_export_all).

-define(APP, emqx).

-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").

-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").

all() ->
    [ {group, all_cases}
    , {group, connected_client_count_group}
    ].

groups() ->
    TCs = emqx_ct:all(?MODULE),
    ConnClientTCs = [ t_connected_client_count_persistent
                    , t_connected_client_count_anonymous
                    , t_connected_client_count_transient_takeover
                    , t_connected_client_stats
                    ],
    OtherTCs = TCs -- ConnClientTCs,
    [ {all_cases, [], OtherTCs}
    , {connected_client_count_group, [ {group, tcp}
                                     , {group, ws}
                                     ]}
    , {tcp, [], ConnClientTCs}
    , {ws, [], ConnClientTCs}
    ].

init_per_group(connected_client_count_group, Config) ->
    Config;
init_per_group(tcp, Config) ->
    emqx_ct_helpers:boot_modules(all),
    emqx_ct_helpers:start_apps([]),
    [{conn_fun, connect} | Config];
init_per_group(ws, Config) ->
    emqx_ct_helpers:boot_modules(all),
    emqx_ct_helpers:start_apps([]),
    [ {ssl, false}
    , {enable_websocket, true}
    , {conn_fun, ws_connect}
    , {port, 8083}
    , {host, "localhost"}
    | Config
    ];
init_per_group(_Group, Config) ->
    emqx_ct_helpers:boot_modules(all),
    emqx_ct_helpers:start_apps([]),
    Config.

end_per_group(connected_client_count_group, _Config) ->
    ok;
end_per_group(_Group, _Config) ->
    emqx_ct_helpers:stop_apps([]).

init_per_suite(Config) ->
    Config.

end_per_suite(_Config) ->
    ok.

init_per_testcase(Case, Config) ->
    ?MODULE:Case({init, Config}).

end_per_testcase(Case, Config) ->
    ?MODULE:Case({'end', Config}).

%%--------------------------------------------------------------------
%% PubSub Test
%%--------------------------------------------------------------------

t_subscribed({init, Config}) ->
    emqx_broker:subscribe(<<"topic">>),
    Config;
t_subscribed(Config) when is_list(Config) ->
    ?assertEqual(false, emqx_broker:subscribed(undefined, <<"topic">>)),
    ?assertEqual(true, emqx_broker:subscribed(self(), <<"topic">>));
t_subscribed({'end', _Config}) ->
    emqx_broker:unsubscribe(<<"topic">>).

t_subscribed_2({init, Config}) ->
    emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
    Config;
t_subscribed_2(Config) when is_list(Config) ->
    ?assertEqual(true, emqx_broker:subscribed(self(), <<"topic">>));
t_subscribed_2({'end', _Config}) ->
    emqx_broker:unsubscribe(<<"topic">>).

t_subopts({init, Config}) -> Config;
t_subopts(Config) when is_list(Config) ->
    ?assertEqual(false, emqx_broker:set_subopts(<<"topic">>, #{qos => 1})),
    ?assertEqual(undefined, emqx_broker:get_subopts(self(), <<"topic">>)),
    ?assertEqual(undefined, emqx_broker:get_subopts(<<"clientid">>, <<"topic">>)),
    emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 1}),
    timer:sleep(200),
    ?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>},
                 emqx_broker:get_subopts(self(), <<"topic">>)),
    ?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>},
                 emqx_broker:get_subopts(<<"clientid">>,<<"topic">>)),

    emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 2}),
    ?assertEqual(#{nl => 0, qos => 2, rap => 0, rh => 0, subid => <<"clientid">>},
                 emqx_broker:get_subopts(self(), <<"topic">>)),

    ?assertEqual(true, emqx_broker:set_subopts(<<"topic">>, #{qos => 0})),
    ?assertEqual(#{nl => 0, qos => 0, rap => 0, rh => 0, subid => <<"clientid">>},
                 emqx_broker:get_subopts(self(), <<"topic">>));
t_subopts({'end', _Config}) ->
    emqx_broker:unsubscribe(<<"topic">>).

t_topics({init, Config}) ->
    Topics = [<<"topic">>, <<"topic/1">>, <<"topic/2">>],
    [{topics, Topics} | Config];
t_topics(Config) when is_list(Config) ->
    Topics = [T1, T2, T3] = proplists:get_value(topics, Config),
    ok = emqx_broker:subscribe(T1, <<"clientId">>),
    ok = emqx_broker:subscribe(T2, <<"clientId">>),
    ok = emqx_broker:subscribe(T3, <<"clientId">>),
    Topics1 = emqx_broker:topics(),
    ?assertEqual(true, lists:foldl(fun(Topic, Acc) ->
                                       case lists:member(Topic, Topics1) of
                                           true -> Acc;
                                           false -> false
                                       end
                                   end, true, Topics));
t_topics({'end', Config}) ->
    Topics = proplists:get_value(topics, Config),
    lists:foreach(fun(T) -> emqx_broker:unsubscribe(T) end, Topics).

t_subscribers({init, Config}) ->
    emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
    Config;
t_subscribers(Config) when is_list(Config) ->
    ?assertEqual([self()], emqx_broker:subscribers(<<"topic">>));
t_subscribers({'end', _Config}) ->
    emqx_broker:unsubscribe(<<"topic">>).

t_subscriptions({init, Config}) ->
    emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 1}),
    Config;
t_subscriptions(Config) when is_list(Config) ->
    ct:sleep(100),
    ?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>},
                 proplists:get_value(<<"topic">>, emqx_broker:subscriptions(self()))),
    ?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>},
                 proplists:get_value(<<"topic">>, emqx_broker:subscriptions(<<"clientid">>)));
t_subscriptions({'end', _Config}) ->
    emqx_broker:unsubscribe(<<"topic">>).

t_sub_pub({init, Config}) ->
    ok = emqx_broker:subscribe(<<"topic">>),
    Config;
t_sub_pub(Config) when is_list(Config) ->
    ct:sleep(100),
    emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
    ?assert(
        receive
            {deliver, <<"topic">>, #message{payload = <<"hello">>}} ->
                true;
            _ ->
                false
        after 100 ->
            false
        end);
t_sub_pub({'end', _Config}) ->
    ok = emqx_broker:unsubscribe(<<"topic">>).

t_nosub_pub({init, Config}) -> Config;
t_nosub_pub({'end', _Config}) -> ok;
t_nosub_pub(Config) when is_list(Config) ->
    ?assertEqual(0, emqx_metrics:val('messages.dropped')),
    emqx_broker:publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
    ?assertEqual(1, emqx_metrics:val('messages.dropped')).

t_shared_subscribe({init, Config}) ->
    emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{share => <<"group">>}),
    ct:sleep(100),
    Config;
t_shared_subscribe(Config) when is_list(Config) ->
    emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
    ?assert(receive
                {deliver, <<"topic">>, #message{payload = <<"hello">>}} ->
                    true;
                Msg ->
                    ct:pal("Msg: ~p", [Msg]),
                    false
            after 100 ->
                false
            end);
t_shared_subscribe({'end', _Config}) ->
    emqx_broker:unsubscribe(<<"$share/group/topic">>).

t_shared_subscribe_2({init, Config}) -> Config;
t_shared_subscribe_2({'end', _Config}) -> ok;
t_shared_subscribe_2(_) ->
    {ok, ConnPid} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid">>}]),
    {ok, _} = emqtt:connect(ConnPid),
    {ok, _, [0]} = emqtt:subscribe(ConnPid, <<"$share/group/topic">>, 0),

    {ok, ConnPid2} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid2">>}]),
    {ok, _} = emqtt:connect(ConnPid2),
    {ok, _, [0]} = emqtt:subscribe(ConnPid2, <<"$share/group2/topic">>, 0),

    ct:sleep(10),
    ok = emqtt:publish(ConnPid, <<"topic">>, <<"hello">>, 0),
    Msgs = recv_msgs(2),
    ?assertEqual(2, length(Msgs)),
    ?assertEqual(true, lists:foldl(fun(#{payload := <<"hello">>, topic := <<"topic">>}, Acc) ->
                                       Acc;
                                      (_, _) ->
                                       false
                                   end, true, Msgs)),
    emqtt:disconnect(ConnPid),
    emqtt:disconnect(ConnPid2).

t_shared_subscribe_3({init, Config}) -> Config;
t_shared_subscribe_3({'end', _Config}) -> ok;
t_shared_subscribe_3(_) ->
    {ok, ConnPid} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid">>}]),
    {ok, _} = emqtt:connect(ConnPid),
    {ok, _, [0]} = emqtt:subscribe(ConnPid, <<"$share/group/topic">>, 0),

    {ok, ConnPid2} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid2">>}]),
    {ok, _} = emqtt:connect(ConnPid2),
    {ok, _, [0]} = emqtt:subscribe(ConnPid2, <<"$share/group/topic">>, 0),

    ct:sleep(10),
    ok = emqtt:publish(ConnPid, <<"topic">>, <<"hello">>, 0),
    Msgs = recv_msgs(2),
    ?assertEqual(1, length(Msgs)),
    emqtt:disconnect(ConnPid),
    emqtt:disconnect(ConnPid2).

t_shard({init, Config}) ->
    ok = meck:new(emqx_broker_helper, [passthrough, no_history]),
    ok = meck:expect(emqx_broker_helper, get_sub_shard, fun(_, _) -> 1 end),
    emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
    Config;
t_shard(Config) when is_list(Config) ->
    ct:sleep(100),
    emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
    ?assert(
        receive
            {deliver, <<"topic">>, #message{payload = <<"hello">>}} ->
                true;
            _ ->
                false
        after 100 ->
            false
        end);
t_shard({'end', _Config}) ->
    emqx_broker:unsubscribe(<<"topic">>),
    ok = meck:unload(emqx_broker_helper).

t_stats_fun({init, Config}) ->
    Parent = self(),
    F = fun Loop() ->
                N1 = emqx_stats:getstat('subscribers.count'),
                N2 = emqx_stats:getstat('subscriptions.count'),
                N3 = emqx_stats:getstat('suboptions.count'),
                case N1 + N2 + N3 =:= 0 of
                    true ->
                        Parent ! {ready, self()},
                        exit(normal);
                    false ->
                        receive
                            stop ->
                                exit(normal)
                        after
                            100 ->
                                Loop()
                        end
                end
        end,
    Pid = spawn_link(F),
    receive
        {ready, P} when P =:= Pid->
            Config
    after
        5000 ->
            Pid ! stop,
            ct:fail("timedout_waiting_for_sub_stats_to_reach_zero")
    end;
t_stats_fun(Config) when is_list(Config) ->
    ok = emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
    ok = emqx_broker:subscribe(<<"topic2">>, <<"clientid">>),
    %% ensure stats refreshed
    emqx_broker:stats_fun(),
    %% emqx_stats:set_stat is a gen_server cast
    %% make a synced call sync
    ignored = gen_server:call(emqx_stats, call, infinity),
    ?assertEqual(2, emqx_stats:getstat('subscribers.count')),
    ?assertEqual(2, emqx_stats:getstat('subscribers.max')),
    ?assertEqual(2, emqx_stats:getstat('subscriptions.count')),
    ?assertEqual(2, emqx_stats:getstat('subscriptions.max')),
    ?assertEqual(2, emqx_stats:getstat('suboptions.count')),
    ?assertEqual(2, emqx_stats:getstat('suboptions.max'));
t_stats_fun({'end', _Config}) ->
    ok = emqx_broker:unsubscribe(<<"topic">>),
    ok = emqx_broker:unsubscribe(<<"topic2">>).

%% persistent sessions, when gone, do not contribute to connected
%% client count
t_connected_client_count_persistent({init, Config}) ->
    ok = snabbkaffe:start_trace(),
    process_flag(trap_exit, true),
    Config;
t_connected_client_count_persistent(Config) when is_list(Config) ->
    ConnFun = ?config(conn_fun, Config),
    ClientID = <<"clientid">>,
    ?assertEqual(0, emqx_cm:get_connected_client_count()),
    {ok, ConnPid0} = emqtt:start_link([ {clean_start, false}
                                      , {clientid, ClientID}
                                      | Config]),
    {{ok, _}, {ok, [_]}} = wait_for_events(
                             fun() -> emqtt:ConnFun(ConnPid0) end,
                             [emqx_cm_connected_client_count_inc]
                            ),
    ?assertEqual(1, emqx_cm:get_connected_client_count()),
    {ok, {ok, [_]}} = wait_for_events(
                        fun() -> emqtt:disconnect(ConnPid0) end,
                        [emqx_cm_connected_client_count_dec]
                       ),
    ?assertEqual(0, emqx_cm:get_connected_client_count()),
    %% reconnecting
    {ok, ConnPid1} = emqtt:start_link([ {clean_start, false}
                                      , {clientid, ClientID}
                                      | Config
                                      ]),
    {{ok, _}, {ok, [_]}} = wait_for_events(
                             fun() -> emqtt:ConnFun(ConnPid1) end,
                             [emqx_cm_connected_client_count_inc]
                            ),
    ?assertEqual(1, emqx_cm:get_connected_client_count()),
    %% taking over
    {ok, ConnPid2} = emqtt:start_link([ {clean_start, false}
                                      , {clientid, ClientID}
                                      | Config
                                      ]),
    {{ok, _}, {ok, [_, _]}} = wait_for_events(
                             fun() -> emqtt:ConnFun(ConnPid2) end,
                             [ emqx_cm_connected_client_count_inc
                             , emqx_cm_connected_client_count_dec
                             ],
                             500
                            ),
    ?assertEqual(1, emqx_cm:get_connected_client_count()),
    %% abnormal exit of channel process
    ChanPids = emqx_cm:all_channels(),
    {ok, {ok, [_, _]}} = wait_for_events(
                           fun() ->
                                   lists:foreach(
                                     fun(ChanPid) -> exit(ChanPid, kill) end,
                                     ChanPids)
                           end,
                           [ emqx_cm_connected_client_count_dec
                           , emqx_cm_process_down
                           ]
                          ),
    ?assertEqual(0, emqx_cm:get_connected_client_count()),
    ok;
t_connected_client_count_persistent({'end', _Config}) ->
    snabbkaffe:stop(),
    ok.

%% connections without client_id also contribute to connected client
%% count
t_connected_client_count_anonymous({init, Config}) ->
    ok = snabbkaffe:start_trace(),
    process_flag(trap_exit, true),
    Config;
t_connected_client_count_anonymous(Config) when is_list(Config) ->
    ConnFun = ?config(conn_fun, Config),
    ?assertEqual(0, emqx_cm:get_connected_client_count()),
    %% first client
    {ok, ConnPid0} = emqtt:start_link([ {clean_start, true}
                                      | Config]),
    {{ok, _}, {ok, [_]}} = wait_for_events(
                             fun() -> emqtt:ConnFun(ConnPid0) end,
                             [emqx_cm_connected_client_count_inc]
                            ),
    ?assertEqual(1, emqx_cm:get_connected_client_count()),
    %% second client
    {ok, ConnPid1} = emqtt:start_link([ {clean_start, true}
                                      | Config]),
    {{ok, _}, {ok, [_]}} = wait_for_events(
                             fun() -> emqtt:ConnFun(ConnPid1) end,
                             [emqx_cm_connected_client_count_inc]
                            ),
    ?assertEqual(2, emqx_cm:get_connected_client_count()),
    %% when first client disconnects, shouldn't affect the second
    {ok, {ok, [_, _]}} = wait_for_events(
                        fun() -> emqtt:disconnect(ConnPid0) end,
                        [ emqx_cm_connected_client_count_dec
                        , emqx_cm_process_down
                        ]
                       ),
    ?assertEqual(1, emqx_cm:get_connected_client_count()),
    %% reconnecting
    {ok, ConnPid2} = emqtt:start_link([ {clean_start, true}
                                      | Config
                                      ]),
    {{ok, _}, {ok, [_]}} = wait_for_events(
                             fun() -> emqtt:ConnFun(ConnPid2) end,
                             [emqx_cm_connected_client_count_inc]
                            ),
    ?assertEqual(2, emqx_cm:get_connected_client_count()),
    {ok, {ok, [_, _]}} = wait_for_events(
                           fun() -> emqtt:disconnect(ConnPid1) end,
                           [ emqx_cm_connected_client_count_dec
                           , emqx_cm_process_down
                           ]
                          ),
    ?assertEqual(1, emqx_cm:get_connected_client_count()),
    %% abnormal exit of channel process
    Chans = emqx_cm:all_channels(),
    {ok, {ok, [_, _]}} = wait_for_events(
                           fun() ->
                                   lists:foreach(
                                     fun(ChanPid) -> exit(ChanPid, kill) end,
                                     Chans)
                           end,
                           [ emqx_cm_connected_client_count_dec
                           , emqx_cm_process_down
                           ]
                          ),
    ?assertEqual(0, emqx_cm:get_connected_client_count()),
    ok;
t_connected_client_count_anonymous({'end', _Config}) ->
    snabbkaffe:stop(),
    ok.

t_connected_client_count_transient_takeover({init, Config}) ->
    ok = snabbkaffe:start_trace(),
    process_flag(trap_exit, true),
    Config;
t_connected_client_count_transient_takeover(Config) when is_list(Config) ->
    ConnFun = ?config(conn_fun, Config),
    ClientID = <<"clientid">>,
    ?assertEqual(0, emqx_cm:get_connected_client_count()),
    %% we spawn several clients simultaneously to cause the race
    %% condition for the client id lock
    NumClients = 20,
    {ok, {ok, [_, _]}} =
        wait_for_events(
          fun() ->
                  lists:foreach(
                    fun(_) ->
                            spawn(
                              fun() ->
                                      {ok, ConnPid} =
                                          emqtt:start_link([ {clean_start, true}
                                                           , {clientid, ClientID}
                                                           | Config]),
                                      %% don't assert the result: most of them fail
                                      %% during the race
                                      emqtt:ConnFun(ConnPid),
                                      ok
                              end),
                            ok
                    end,
                    lists:seq(1, NumClients))
          end,
          %% there can be only one channel that wins the race for the
          %% lock for this client id.  we also expect a decrement
          %% event because the client dies along with the ephemeral
          %% process.
          [ emqx_cm_connected_client_count_inc
          , emqx_cm_connected_client_count_dec
          ],
          1000),
    %% Since more than one pair of inc/dec may be emitted, we need to
    %% wait for full stabilization
    timer:sleep(100),
    %% It must be 0 again because we spawn-linked the clients in
    %% ephemeral processes above, and all should be dead now.
    ?assertEqual(0, emqx_cm:get_connected_client_count()),
    %% connecting again
    {ok, ConnPid1} = emqtt:start_link([ {clean_start, true}
                                      , {clientid, ClientID}
                                      | Config
                                      ]),
    {{ok, _}, {ok, [_]}} =
        wait_for_events(
          fun() -> emqtt:ConnFun(ConnPid1) end,
          [emqx_cm_connected_client_count_inc]
         ),
    ?assertEqual(1, emqx_cm:get_connected_client_count()),
    %% abnormal exit of channel process
    [ChanPid] = emqx_cm:all_channels(),
    {ok, {ok, [_, _]}} =
        wait_for_events(
          fun() ->
                  exit(ChanPid, kill),
                  ok
          end,
          [ emqx_cm_connected_client_count_dec
          , emqx_cm_process_down
          ]
         ),
    ?assertEqual(0, emqx_cm:get_connected_client_count()),
    ok;
t_connected_client_count_transient_takeover({'end', _Config}) ->
    snabbkaffe:stop(),
    ok.

t_connected_client_stats({init, Config}) ->
    ok = supervisor:terminate_child(emqx_kernel_sup, emqx_stats),
    {ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats),
    ok = snabbkaffe:start_trace(),
    Config;
t_connected_client_stats(Config) when is_list(Config) ->
    ConnFun = ?config(conn_fun, Config),
    ?assertEqual(0, emqx_cm:get_connected_client_count()),
    ?assertEqual(0, emqx_stats:getstat('live_connections.count')),
    ?assertEqual(0, emqx_stats:getstat('live_connections.max')),
    {ok, ConnPid} = emqtt:start_link([ {clean_start, true}
                                     , {clientid, <<"clientid">>}
                                     | Config
                                     ]),
    {{ok, _}, {ok, [_]}} = wait_for_events(
                             fun() -> emqtt:ConnFun(ConnPid) end,
                             [emqx_cm_connected_client_count_inc]
                            ),
    %% ensure stats are synchronized
    {_, {ok, [_]}} = wait_for_stats(
                       fun emqx_cm:stats_fun/0,
                       [#{count_stat => 'live_connections.count',
                          max_stat => 'live_connections.max'}]
                      ),
    ?assertEqual(1, emqx_stats:getstat('live_connections.count')),
    ?assertEqual(1, emqx_stats:getstat('live_connections.max')),
    {ok, {ok, [_]}} = wait_for_events(
                        fun() -> emqtt:disconnect(ConnPid) end,
                        [emqx_cm_connected_client_count_dec]
                       ),
    %% ensure stats are synchronized
    {_, {ok, [_]}} = wait_for_stats(
                       fun emqx_cm:stats_fun/0,
                       [#{count_stat => 'live_connections.count',
                          max_stat => 'live_connections.max'}]
                      ),
    ?assertEqual(0, emqx_stats:getstat('live_connections.count')),
    ?assertEqual(1, emqx_stats:getstat('live_connections.max')),
    ok;
t_connected_client_stats({'end', _Config}) ->
    ok = snabbkaffe:stop(),
    ok = supervisor:terminate_child(emqx_kernel_sup, emqx_stats),
    {ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats),
    ok.

%% the count must be always non negative
t_connect_client_never_negative({init, Config}) ->
    Config;
t_connect_client_never_negative(Config) when is_list(Config) ->
    ?assertEqual(0, emqx_cm:get_connected_client_count()),
    %% would go to -1
    ChanPid = list_to_pid("<0.0.1>"),
    emqx_cm:mark_channel_disconnected(ChanPid),
    ?assertEqual(0, emqx_cm:get_connected_client_count()),
    %% would be 0, if really went to -1
    emqx_cm:mark_channel_connected(ChanPid),
    ?assertEqual(1, emqx_cm:get_connected_client_count()),
    ok;
t_connect_client_never_negative({'end', _Config}) ->
    ok.

t_connack_auth_error({init, Config}) ->
    process_flag(trap_exit, true),
    emqx_ct_helpers:stop_apps([]),
    emqx_ct_helpers:boot_modules(all),
    Handler =
        fun(emqx) ->
                application:set_env(emqx, acl_nomatch, deny),
                application:set_env(emqx, allow_anonymous, false),
                application:set_env(emqx, enable_acl_cache, false),
                ok;
           (_) ->
                ok
        end,
    emqx_ct_helpers:start_apps([], Handler),
    Config;
t_connack_auth_error({'end', _Config}) ->
    emqx_ct_helpers:stop_apps([]),
    emqx_ct_helpers:boot_modules(all),
    emqx_ct_helpers:start_apps([]),
    ok;
t_connack_auth_error(Config) when is_list(Config) ->
    %% MQTT 3.1
    ?assertEqual(0, emqx_metrics:val('packets.connack.auth_error')),
    {ok, C0} = emqtt:start_link([{proto_ver, v4}]),
    ?assertEqual({error, {unauthorized_client, undefined}}, emqtt:connect(C0)),
    ?assertEqual(1, emqx_metrics:val('packets.connack.auth_error')),
    %% MQTT 5.0
    {ok, C1} = emqtt:start_link([{proto_ver, v5}]),
    ?assertEqual({error, {not_authorized, #{}}}, emqtt:connect(C1)),
    ?assertEqual(2, emqx_metrics:val('packets.connack.auth_error')),
    ok.

t_handle_in_empty_client_subscribe_hook({init, Config}) ->
    Config;
t_handle_in_empty_client_subscribe_hook({'end', _Config}) ->
    ok;
t_handle_in_empty_client_subscribe_hook(Config) when is_list(Config) ->
    Hook = fun(_ClientInfo, _Username, TopicFilter) ->
                   EmptyFilters = [{T, Opts#{deny_subscription => true}} || {T, Opts} <- TopicFilter],
                   {stop, EmptyFilters}
           end,
    ok = emqx:hook('client.subscribe', Hook, []),
    try
        {ok, C} = emqtt:start_link(),
        {ok, _} = emqtt:connect(C),
        {ok, _, RCs} = emqtt:subscribe(C, <<"t">>),
        ?assertEqual([?RC_UNSPECIFIED_ERROR], RCs),
        ok
    after
        ok = emqx:unhook('client.subscribe', Hook)
    end.

wait_for_events(Action, Kinds) ->
    wait_for_events(Action, Kinds, 500).

wait_for_events(Action, Kinds, Timeout) ->
    Predicate = fun(#{?snk_kind := K}) ->
                        lists:member(K, Kinds)
                end,
    N = length(Kinds),
    {ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, 0),
    Res = Action(),
    case snabbkaffe_collector:receive_events(Sub) of
        {timeout, _} ->
            {Res, timeout};
        {ok, Events} ->
            {Res, {ok, Events}}
    end.

wait_for_stats(Action, Stats) ->
    Predicate = fun(Event = #{?snk_kind := emqx_stats_setstat}) ->
                        Stat = maps:with(
                                 [ count_stat
                                 , max_stat
                                 ], Event),
                        lists:member(Stat, Stats);
                   (_) ->
                        false
                end,
    N = length(Stats),
    Timeout = 500,
    {ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, 0),
    Res = Action(),
    case snabbkaffe_collector:receive_events(Sub) of
        {timeout, _} ->
            {Res, timeout};
        {ok, Events} ->
            {Res, {ok, Events}}
    end.

insert_fake_channels() ->
    %% Insert copies to simulate missed counts
    Tab = emqx_channel_info,
    Key = ets:first(Tab),
    [{_Chan, ChanInfo = #{conn_state := connected}, Stats}] = ets:lookup(Tab, Key),
    ets:insert(Tab, [ {{"fake" ++ integer_to_list(N), undefined}, ChanInfo, Stats}
                     || N <- lists:seq(1, 9)]),
    %% these should not be counted
    ets:insert(Tab, [ { {"fake" ++ integer_to_list(N), undefined}
                      , ChanInfo#{conn_state := disconnected}, Stats}
                     || N <- lists:seq(10, 20)]).

recv_msgs(Count) ->
    recv_msgs(Count, []).

recv_msgs(0, Msgs) ->
    Msgs;
recv_msgs(Count, Msgs) ->
    receive
        {publish, Msg} ->
            recv_msgs(Count-1, [Msg|Msgs]);
        _Other -> recv_msgs(Count, Msgs)
    after 100 ->
        Msgs
    end.
